/*
 * Software License Agreement (BSD License)
 *
 *  Copyright (c) 2008, Willow Garage, Inc.
 *  All rights reserved.
 *
 *  Redistribution and use in source and binary forms, with or without
 *  modification, are permitted provided that the following conditions
 *  are met:
 *
 *   * Redistributions of source code must retain the above copyright
 *     notice, this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above
 *     copyright notice, this list of conditions and the following
 *     disclaimer in the documentation and/or other materials provided
 *     with the distribution.
 *   * Neither the name of Willow Garage, Inc. nor the names of its
 *     contributors may be used to endorse or promote products derived
 *     from this software without specific prior written permission.
 *
 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 *  POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef ROSCPP_SESSIONS_H
#define ROSCPP_SESSIONS_H

#include <string>
#include <sstream>
#include <map>
#include <list>
#include <ros/node_handle.h>
#include <ros/service.h>
#include <ros/service_traits.h>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>

namespace ros {
namespace session {

class abstractSession
{
public:
    abstractSession(const std::string session_name, int sessionid) : _sessionid(sessionid), bterminated(false) {
        std::stringstream ss; ss << _sessionid;
        _session_name = _nh.resolveName(session_name);
        size_t pos = _session_name.rfind('/');
        if( pos == std::string::npos )
            _session_dir = "/";
        else
            _session_dir = _session_name.substr(0,pos+1);
        _sessionheader[_session_name] = ss.str();
        _threadasync = boost::thread(boost::bind(&abstractSession::_AsyncWorkerThread, this));
    }

    virtual ~abstractSession() {
        if( !bterminated ) {
            close_connections();
            bterminated = true;
        }

        {
            boost::mutex::scoped_lock lock(async_mutex);
            _condHasAsyncCalls.notify_all();
        }
        _threadasync.join();
    }

    /// call a service part of a session, all services are relative to the session's scope, NOT the
    /// calling node's scope. This enables users to call session services without worrying about
    /// where its services are
    template<class MReq, class MRes>
    bool call(const std::string &service_name, MReq &req, MRes &res, bool bAsync=false)
    {
        if( service_name.size() == 0 ) {
            ROS_WARN("service name empty");
            return false;
        }

        boost::mutex::scoped_lock servlock(serv_mutex);
        if( bterminated ) {
            ROS_WARN("session instance already terminated");
            return false;
        }

        //    if (!isValid())
        //    {
        //      return false;
        //    }

        namespace st = service_traits;
        if (strcmp(st::md5sum(req), st::md5sum(res))) {
            ROS_ERROR("The request and response parameters to the service "
                      "call must be autogenerated from the same "
                      "server definition file (.srv). your service call "
                      "for %s appeared to use request/response types "
                      "from different .srv files. (%s vs. %s)", service_name.c_str(), st::md5sum(req), st::md5sum(res));
            return false;
        }

        //return call(req, res, st::md5sum(req));

        // first find the service in the map
        std::map<std::string, ServiceClient>::iterator itserv = _mapservices.find(service_name);
        ServiceClient handle;

        if( itserv != _mapservices.end() ) {
            if( itserv->second.isValid() ) {
                handle = itserv->second;
            }
            else {
                _mapservices.erase(itserv);
            }
        }

        if( !handle ) {
            // get the global service if service_name is a local path
            std::string global_service_name;
            if( service_name[0] != '/' ) {
                global_service_name = _session_dir + service_name;
            }
            else {
                global_service_name = service_name;
            }
            ros::ServiceClientOptions ops(global_service_name, st::md5sum(req), true, _sessionheader);
            handle = _nh.serviceClient(ops);
            if( !handle ) {
                return false;
            }
            _mapservices[service_name] = handle;
        }

        if( bAsync ) {
            // push onto async thread
            boost::mutex::scoped_lock lock(async_mutex);
            listAsyncCalls.push_back(AsyncCallBasePtr(new AsyncCall<MReq,MRes>(handle, st::md5sum(req), new MReq(req), new MRes(res))));
            return true;
        }

        // call directly
        _flush_async();
        return handle.call(req, res);
    }

    /// get the full session name
    const std::string& GetSessionName() const {
        return _session_name;
    }
    int GetSessionId() const {
        return _sessionid;
    }

    /// close all the persistent service connections (does not terminate the service)
    virtual bool close_connections()
    {
        if( bterminated ) {
            ROS_WARN("session instance already terminated");
            return false;
        }

        boost::mutex::scoped_lock lock(serv_mutex);
        _flush_async();
        _mapservices.clear();

        return true;
    }

    /// terminate the session instance
    virtual bool terminate() = 0;
    virtual bool isterminated() const {
        return bterminated;
    }

private:
    ros::NodeHandle _nh;
    std::string _session_name, _session_dir;
    int _sessionid; ///< unique session id to session_name
    M_string _sessionheader;

    std::map<std::string, ServiceClient> _mapservices;

    class AsyncCallBase
    {
public:
        AsyncCallBase(ros::ServiceClient handle, const std::string& server_md5sum) : _handle(handle), _server_md5sum(server_md5sum) {
        }
        virtual ~AsyncCallBase() {
        }
        virtual bool call() = 0;
        ros::ServiceClient _handle;
        std::string _server_md5sum;
    };
    typedef boost::shared_ptr<AsyncCallBase> AsyncCallBasePtr;

    template<class MReq, class MRes>
    class AsyncCall : public AsyncCallBase
    {
public:
        AsyncCall(ros::ServiceClient handle, const std::string& server_md5sum, MReq* preq, MRes* pres)
            : AsyncCallBase(handle, server_md5sum), req(preq), res(pres) {
        }
        virtual bool call() {
            return _handle.call(*req.get(), *res.get(), _server_md5sum);
        }
        boost::shared_ptr<MReq> req;
        boost::shared_ptr<MRes> res;
    };

    std::list< AsyncCallBasePtr > listAsyncCalls; ///< calls that are running asynchronously waiting for responses
    boost::thread _threadasync; ///< async worker thread

    void _AsyncWorkerThread()
    {
        boost::mutex::scoped_lock lock(async_mutex);

        while(!bterminated) {
            if( listAsyncCalls.size() == 0 ) {
                _condHasAsyncCalls.wait(lock);
            }
            if( bterminated ) {
                break;
            }

            ROS_ASSERT(listAsyncCalls.size()>0);
            if( listAsyncCalls.size() == 0 ) {
                continue;
            }

            AsyncCallBasePtr asynccall = listAsyncCalls.front();
            lock.unlock();

            if( !asynccall->call() ) {
                ROS_WARN("async service call failed\n");
            }

            lock.lock();
            ROS_ASSERT(listAsyncCalls.size() > 0);
            ROS_ASSERT(listAsyncCalls.front()->_handle == asynccall->_handle);
            listAsyncCalls.pop_front();

            if( listAsyncCalls.size() == 0 ) {
                _condAsyncCallsFlushed.notify_all();
            }
        }
    }

protected:
    /// wait for all async messages to complete
    virtual void _flush_async()
    {
        boost::mutex::scoped_lock lock(async_mutex);
        if( listAsyncCalls.size() == 0 ) {
            return;
        }
        _condHasAsyncCalls.notify_all();
        _condAsyncCallsFlushed.wait(async_mutex);
        ROS_ASSERT( listAsyncCalls.size() == 0 );
    }

    boost::mutex serv_mutex, async_mutex;
    boost::condition _condHasAsyncCalls, _condAsyncCallsFlushed;
    bool bterminated; ///< true if session is terminated
};

template<class MReq, class MRes>
class Session : public abstractSession
{
public:
    Session(const std::string _session_name, int _sessionid)
        : abstractSession(_session_name, _sessionid) {
    }
    // optional call copy constructors for request and response messages when message info is not known at compile time
    Session(const std::string _session_name, int _sessionid, const MReq& req, const MRes& res)
        : abstractSession(_session_name, _sessionid), session_req(req), session_res(res) {
    }
    virtual ~Session() {
    }
    virtual bool terminate() {
        if( bterminated ) {
            ROS_WARN("session instance already terminated");
            return false;
        }
        close_connections();
        boost::mutex::scoped_lock lock(serv_mutex);
        session_req.sessionid = GetSessionId();
        bool bsuccess = service::call(GetSessionName(), session_req, session_res);
        bterminated = true;
        return bsuccess;
    }

protected:
    MReq session_req;
    MRes session_res;
};

/// create a session
typedef boost::shared_ptr<abstractSession> abstractSessionHandle;

template<class MReq, class MRes>
abstractSessionHandle create_session(const std::string &session_name, MReq &req, MRes &res)
{
    // no reason to wait
    //if( !ros::service::wait_for_service(session_name) )
    //    return abstractSessionHandle(); // couldn't find the service

    if( !service::call(session_name,req,res) )
        return abstractSessionHandle();

    return abstractSessionHandle(new Session<MReq,MRes>(session_name, res.sessionid, req, res));
}

} // end namespace session
} // end namespace ros

#endif
